跳到主要内容

创建动作服务端(Python)

在 ROS2 中创建动作服务端可以使节点接收动作客户端发来的任务目标,执行长时间运行的任务,并实时返回进度反馈,最终返回任务的执行结果。本节通过实现经典的斐波那契数列(Fibonacci)动作服务端来展示动作服务端的创建方法。

Fibonacci 动作结构详解

在创建动作服务端和客户端之前,我们明确一下ROS 2中该动作的数据结构:

  • 目标请求

  • order: 10 客户端请求动作服务器计算长度为10的斐波那契数列。

  • 反馈信息partial_sequence: [0, 1, 1, 2, 3, 5] 动作服务器执行任务期间,实时向客户端提供进展情况。

  • 最终结果

    sequence: [0, 1, 1, 2, 3, 5, 8, 13, 21, 34] 动作服务器计算完成后,最终返回给客户端的斐波那契数列结果。

创建功能包

首先,在 ROS2 工作空间中创建一个新的 Python 功能包,并引入动作相关的依赖项:

打开终端,执行:

cd ~/ros2_ws/src 
ros2 pkg create --build-type ament_python py_action_tutorial --dependencies rclpy action_tutorials_interfaces

在这段创建功能报的命令中使用 --dependencies 添加了如下两个功能包,这样后期就不需要配置 packages.xml 文件中的依赖项了:

  • rclpy:ROS 2 Python客户端库
  • action_tutorials_interfaces:包含动作定义(这里使用官方提供的Fibonacci动作接口)

动作服务端源代码

~/ros2_ws/src/py_action_tutorial/py_action_tutorial 目录下创建一个名为fibonacci_action_server.py 的Python文件,并粘贴以下代码:

import rclpy
from rclpy.action import ActionServer, GoalResponse, CancelResponse
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci

class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback,
goal_callback=self.goal_callback
)

def goal_callback(self,goal_request):
if goal_request.order<0:
self.get_logger().warn("Reject goal:order must>0")
return GoalResponse.REJECT
return GoalResponse.ACCEPT

def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
feedback_msg = Fibonacci.Feedback()
feedback_msg.partial_sequence = [0, 1]

for i in range(1, goal_handle.request.order):
feedback_msg.partial_sequence.append(
feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i - 1]
)

self.get_logger().info(f'Feedback: {feedback_msg.partial_sequence}')
goal_handle.publish_feedback(feedback_msg)

goal_handle.succeed()

result = Fibonacci.Result()
result.sequence = feedback_msg.partial_sequence
return result

def main(args=None):
rclpy.init(args=args)
fibonacci_action_server = FibonacciActionServer()
rclpy.spin(fibonacci_action_server)
fibonacci_action_server.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

详细代码解释(逐步讲解)

下面将逐步解释上述代码的每个部分:

导入所需的模块

import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci

  • rclpy:ROS 2 Python 客户端库,提供ROS系统的基础功能。
  • ActionServer:ROS 2动作服务器类,用于创建动作服务器节点。
  • Node:ROS 2节点的基础类,所有ROS 2节点都从这个类继承。
  • Fibonacci:ROS 2官方提供的动作类型,用于生成斐波那契数列。其结构包含目标、反馈和结果。

创建动作服务器节点类

class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_action_server')
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback
)
  • 定义一个继承自Node的动作服务器类FibonacciActionServer

  • 在构造函数中,调用父类的构造函数super().__init__('fibonacci_action_server')来初始化节点名称。

  • 使用ActionServer创建一个动作服务器实例:

    • 参数依次是当前节点实例(self)、动作类型(Fibonacci)、动作的名称(fibonacci)、以及接收目标后执行任务的回调函数(execute_callback)。

定义执行动作目标的回调函数

def execute_callback(self, goal_handle):
self.get_logger().info('Executing goal...')
feedback_msg = Fibonacci.Feedback()
feedback_msg.partial_sequence = [0, 1]

for i in range(1, goal_handle.request.order):
feedback_msg.partial_sequence.append(
feedback_msg.partial_sequence[i] + feedback_msg.partial_sequence[i - 1]
)

self.get_logger().info(f'Feedback: {feedback_msg.partial_sequence}')
goal_handle.publish_feedback(feedback_msg)

goal_handle.succeed()

result = Fibonacci.Result()
result.sequence = feedback_msg.partial_sequence
return result
  • 当客户端发送目标请求(如计算10个斐波那契数)后,此回调函数将自动被调用。

  • goal_handle.request.order:获取客户端请求的目标数据(生成数列的长度)。

  • 初始化反馈消息(feedback_msg),从 [0, 1] 开始生成斐波那契数列。

  • 在循环中,每计算出一个新的斐波那契数,就更新并发布反馈给客户端。

  • 完成计算后调用goal_handle.succeed()标记目标执行成功。

  • 创建结果消息(Fibonacci.Result),并将整个数列返回客户端。

主函数定义

def main(args=None):
rclpy.init(args=args)
fibonacci_action_server = FibonacciActionServer()
rclpy.spin(fibonacci_action_server)
fibonacci_action_server.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()
  • rclpy.init(args=args) 初始化ROS 2通信系统。

  • 创建动作服务器节点实例(fibonacci_action_server)。

  • rclpy.spin() 使节点持续运行,监听并响应客户端发来的动作目标请求。

  • 程序退出前调用destroy_node()显式释放节点资源。

  • rclpy.shutdown()关闭ROS 2通信系统。

创建动作客户端(Python)

上一节我们介绍了如何创建 ROS2 动作服务器节点,本节我们将学习如何创建相应的动作客户端节点,用于向动作服务器发送目标并实时接收反馈和最终结果。

动作客户端完整代码

在功能包目录 ~/ros2_ws/src/py_action_tutorial/py_action_tutorial 中,创建名为 fibonacci_action_client.py 的文件,并加入以下代码:

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci

class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order

self._action_client.wait_for_server()
self.get_logger().info('Action server available. Sending goal...')

self._send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)

self._send_goal_future.add_done_callback(self.goal_response_callback)

def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected by server.')
return

self.get_logger().info('Goal accepted by server, waiting for result...')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)

def get_result_callback(self, future):
result = future.result().result
self.get_logger().info(f'Result received: {result.sequence}')
rclpy.shutdown()

def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info(f'Received feedback: {feedback.partial_sequence}')

def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
action_client.send_goal(10) # 请求生成长度为10的斐波那契数列
rclpy.spin(action_client)

if __name__ == '__main__':
main()

详细代码解释(逐步讲解)

接下来我们逐步解释上述客户端代码每个部分的功能:

导入必要模块和接口

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from action_tutorials_interfaces.action import Fibonacci

  • rclpy 是ROS 2提供的Python客户端库。
  • ActionClient 是ROS 2动作客户端的基础类,用于与动作服务器通信。
  • Node 是所有ROS 2节点的基础类。
  • Fibonacci 是ROS 2提供的官方动作接口,定义了目标、反馈和结果的结构。

创建动作客户端节点类

class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_action_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
  • 创建一个名为FibonacciActionClient的类,继承自ROS 2的基础节点类Node

  • 在构造函数中,调用父类构造函数,指定节点名称为fibonacci_action_client

  • 创建动作客户端实例:

    • 第一个参数是节点实例自身(self)。

    • 第二个参数是动作接口类型(Fibonacci)。

    • 第三个参数是动作服务器名称(fibonacci),必须与服务端一致。

发送动作目标请求的方法(send_goal)

def send_goal(self, order):
goal_msg = Fibonacci.Goal()
goal_msg.order = order

self._action_client.wait_for_server()
self.get_logger().info('Action server available. Sending goal...')

self._send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
self._send_goal_future.add_done_callback(self.goal_response_callback)
  • 创建目标消息 goal_msg,设置生成数列的长度为用户指定的参数 order

  • 调用wait_for_server()等待动作服务器上线。

  • 使用send_goal_async()异步发送目标,并提供反馈回调函数feedback_callback,用于实时接收服务器反馈。

  • 异步发送目标后返回一个future对象,并注册目标响应回调函数goal_response_callback

处理目标响应的回调函数(goal_response_callback)

def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected by server.')
return

self.get_logger().info('Goal accepted by server, waiting for result...')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
  • 检查动作服务器是否接受了目标请求。

  • 若未接受,则打印日志提示用户。

  • 若接受,则调用get_result_async()异步获取最终结果。

  • 注册获取结果后的回调函数get_result_callback

接收并处理动作结果(get_result_callback)

def get_result_callback(self, future):
result = future.result().result
self.get_logger().info(f'Result received: {result.sequence}')
rclpy.shutdown()
  • 当动作服务器完成任务并返回结果时,此函数被调用。

  • 打印动作的执行结果(斐波那契数列)。

  • 使用rclpy.shutdown()关闭ROS 2通信系统。

实时接收动作反馈(feedback_callback)

def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info(f'Received feedback: {feedback.partial_sequence}')
  • 在动作执行过程中,每次服务器发送反馈消息时,都会调用此函数。

  • 打印当前数列生成的进度,让用户了解任务执行的实时状态。

定义客户端节点主函数(main)

def main(args=None):
rclpy.init(args=args)
action_client = FibonacciActionClient()
action_client.send_goal(10) # 发送目标请求生成长度为10的数列
rclpy.spin(action_client)
  • 初始化 ROS2 通信系统。

  • 创建动作客户端节点实例。

  • 调用send_goal(10)向服务器请求生成长度为10的斐波那契数列。

  • 使用rclpy.spin()启动事件循环,监听服务器反馈和结果,直至节点关闭。

ROS2 动作信息流程

图 2: ROS2 动作信息流程

结合图 2 和前面的代码,我们可以更清晰地梳理 ROS2 动作(Action)的通信流程。

在服务端,动作服务器实现了两个关键回调函数:

  • goal_callback():用于处理客户端发送的目标请求,决定是否接受该目标。

  • execute_callback():当目标被接受后,服务器在此函数中执行具体计算,并在过程中周期性发布反馈,最终返回结果。

在客户端,通信过程如下:

  1. wait_for_server():客户端首先调用此函数等待动作服务器上线。如果服务器不可用,程序会阻塞在此处。

  2. send_goal_async():服务器可用后,客户端通过该异步函数发送目标请求。为了在请求完成后自动处理结果,客户端会使用 add_done_callback()goal_response_callback() 注册为完成回调函数。

  3. goal_response_callback():当服务器返回目标受理结果(接受或拒绝)时,该回调会自动被触发。如果目标被接受,回调内部会调用 get_result_async() 异步请求最终计算结果。同时,客户端在目标执行期间会收到服务器通过 feedback_callback() 发布的反馈信息。

  4. get_result_callback():当服务器完成执行并返回最终结果时,客户端会自动调用此回调,从而获得动作的最终结果。

构建与运行动作节点

由于在创建功能包的时候已经添加了 --dependencies 的依赖项,因此这里不需要对依赖项进行重新配置,只需要配置 setup.py 文件即可:

编辑 py_action_tutorial/setup.py,在 entry_points 中添加:

entry_points={
'console_scripts': [
'fibonacci_action_server=py_action_tutorial.fibonacci_action_server:main',
'fibonacci_action_client=py_action_tutorial.fibonacci_action_client:main',
],
},

构建功能包:

cd ~/ros2_ws
colcon build --packages-select py_action_tutorial

加载环境变量: source install/setup.bash

确保动作服务器已启动: ros2 run py_action_tutorial fibonacci_action_server

再启动客户端节点: ros2 run py_action_tutorial fibonacci_action_client

客户端终端示例输出:

[INFO] [fibonacci_action_client]: Action server available. Sending goal...
[INFO] [fibonacci_action_client]: Goal accepted by server, waiting for result...
[INFO] [fibonacci_action_client]: Received feedback: [0, 1]
[INFO] [fibonacci_action_client]: Received feedback: [0, 1, 1]
[INFO] [fibonacci_action_client]: Received feedback: [0, 1, 1, 2]
...
[INFO] [fibonacci_action_client]: Result received: [0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55]

小结

本章介绍了 ROS2 中的动作(Action)机制,说明了它在处理 长时间运行、需要实时反馈且可中途取消的任务 时的优势。通过对 .action 文件结构、通信流程以及动作客户端与服务器角色的讲解。

在实践部分,本章通过 Fibonacci 示例展示了如何编写 Python 动作服务器与客户端,并介绍了常用的 ROS2 动作命令,帮助读者在实际项目中调试和使用 Action 接口。

通过本章的学习,读者已经具备在 ROS2 中使用 Action 实现复杂任务控制的能力,为进一步学习导航、机械臂动作等高级功能打下基础。